---
title: flow_runs
sidebarTitle: flow_runs
---

# `prefect.flow_runs`

## Functions

### `wait_for_flow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_runs.py#L57" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
wait_for_flow_run(flow_run_id: UUID, timeout: int | None = 10800, poll_interval: int | None = None, client: 'PrefectClient | None' = None, log_states: bool = False) -> FlowRun
```


Waits for the prefect flow run to finish and returns the FlowRun

**Args:**
- `flow_run_id`: The flow run ID for the flow run to wait for.
- `timeout`: The wait timeout in seconds. Defaults to 10800 (3 hours).
- `poll_interval`: Deprecated; polling is no longer used to wait for flow runs.
- `client`: Optional Prefect client. If not provided, one will be injected.
- `log_states`: If True, log state changes. Defaults to False.

**Returns:**
- The finished flow run.

**Raises:**
- `prefect.exceptions.FlowWaitTimeout`: If flow run goes over the timeout.

**Examples:**

Create a flow run for a deployment and wait for it to finish:
    ```python
    import asyncio

    from prefect.client.orchestration import get_client
    from prefect.flow_runs import wait_for_flow_run

    async def main():
        async with get_client() as client:
            flow_run = await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
            flow_run = await wait_for_flow_run(flow_run_id=flow_run.id)
            print(flow_run.state)

    if __name__ == "__main__":
        asyncio.run(main())

    ```

Trigger multiple flow runs and wait for them to finish:
    ```python
    import asyncio

    from prefect.client.orchestration import get_client
    from prefect.flow_runs import wait_for_flow_run

    async def main(num_runs: int):
        async with get_client() as client:
            flow_runs = [
                await client.create_flow_run_from_deployment(deployment_id="my-deployment-id")
                for _
                in range(num_runs)
            ]
            coros = [wait_for_flow_run(flow_run_id=flow_run.id) for flow_run in flow_runs]
            finished_flow_runs = await asyncio.gather(*coros)
            print([flow_run.state for flow_run in finished_flow_runs])

    if __name__ == "__main__":
        asyncio.run(main(num_runs=10))

    ```


### `pause_flow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_runs.py#L184" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
pause_flow_run(wait_for_input: Type[T] | None = None, timeout: int = 3600, poll_interval: int = 10, key: str | None = None) -> T | None
```


Pauses the current flow run by blocking execution until resumed.

When called within a flow run, execution will block and no downstream tasks will
run until the flow is resumed. Task runs that have already started will continue
running. A timeout parameter can be passed that will fail the flow run if it has not
been resumed within the specified time.

**Args:**
- `timeout`: the number of seconds to wait for the flow to be resumed before
failing. Defaults to 1 hour (3600 seconds). If the pause timeout exceeds
any configured flow-level timeout, the flow might fail even after resuming.
- `poll_interval`: The number of seconds between checking whether the flow has been
resumed. Defaults to 10 seconds.
- `key`: An optional key to prevent calling pauses more than once. This defaults to
the number of pauses observed by the flow so far, and prevents pauses that
use the "reschedule" option from running the same pause twice. A custom key
can be supplied for custom pausing behavior.
- `wait_for_input`: a subclass of `RunInput` or any type supported by
Pydantic. If provided when the flow pauses, the flow will wait for the
input to be provided before resuming. If the flow is resumed without
providing the input, the flow will fail. If the flow is resumed with the
input, the flow will resume and the input will be loaded and returned
from this function.

Example:
```python
@task
def task_one():
    for i in range(3):
        sleep(1)

@flow
def my_flow():
    terminal_state = task_one.submit(return_state=True)
    if terminal_state.type == StateType.COMPLETED:
        print("Task one succeeded! Pausing flow run..")
        pause_flow_run(timeout=2)
    else:
        print("Task one failed. Skipping pause flow run..")
```


### `suspend_flow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_runs.py#L356" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
suspend_flow_run(wait_for_input: Type[T] | None = None, flow_run_id: UUID | None = None, timeout: int | None = 3600, key: str | None = None, client: 'PrefectClient | None' = None) -> T | None
```


Suspends a flow run by stopping code execution until resumed.

When suspended, the flow run will continue execution until the NEXT task is
orchestrated, at which point the flow will exit. Any tasks that have
already started will run until completion. When resumed, the flow run will
be rescheduled to finish execution. In order suspend a flow run in this
way, the flow needs to have an associated deployment and results need to be
configured with the `persist_result` option.

**Args:**
- `flow_run_id`: a flow run id. If supplied, this function will attempt to
suspend the specified flow run. If not supplied will attempt to
suspend the current flow run.
- `timeout`: the number of seconds to wait for the flow to be resumed before
failing. Defaults to 1 hour (3600 seconds). If the pause timeout
exceeds any configured flow-level timeout, the flow might fail even
after resuming.
- `key`: An optional key to prevent calling suspend more than once. This
defaults to a random string and prevents suspends from running the
same suspend twice. A custom key can be supplied for custom
suspending behavior.
- `wait_for_input`: a subclass of `RunInput` or any type supported by
Pydantic. If provided when the flow suspends, the flow will remain
suspended until receiving the input before resuming. If the flow is
resumed without providing the input, the flow will fail. If the flow is
resumed with the input, the flow will resume and the input will be
loaded and returned from this function.


### `resume_flow_run` <sup><a href="https://github.com/PrefectHQ/prefect/blob/main/src/prefect/flow_runs.py#L464" target="_blank"><Icon icon="github" style="width: 14px; height: 14px;" /></a></sup>

```python
resume_flow_run(flow_run_id: UUID, run_input: dict[str, Any] | None = None) -> None
```


Resumes a paused flow.

**Args:**
- `flow_run_id`: the flow_run_id to resume
- `run_input`: a dictionary of inputs to provide to the flow run.

